package com.jhf.youke.flink.domain.service;

import lombok.extern.log4j.Log4j2;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.sink.PulsarSink;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.pulsar.client.api.SubscriptionType;
import org.springframework.stereotype.Component;

/**
* @Description:
* @Param:
* @return:
* @Author: RHJ
* @Date: 2022/11/16
*/
@Component
@Log4j2
public class SourceService {
    private StreamExecutionEnvironment env;
    private String serviceUrl;
    private String adminUrl;

    public SourceService(){
        this.env =  StreamExecutionEnvironment.getExecutionEnvironment();;
    }

    public void wordCount() throws Exception{

        // 参考文档 https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/pulsar/
        // 创建 PulsarSource；
        PulsarSource<String> source = PulsarSource.builder()
                .setServiceUrl(serviceUrl)
                .setAdminUrl(adminUrl)
                .setStartCursor(StartCursor.earliest())
                // 主题
                .setTopics("my-topic")
                .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
                // 订阅名称，自定义
                .setSubscriptionName("my-subscription")
                // exclusive（独占） shared（共享） key_shared（key 共享）
                .setSubscriptionType(SubscriptionType.Exclusive)
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");

        // 对接收到数据进行处理
//        log.info("receive {}", stream);



        // sink to pulsar
        PulsarSink<String> sink = PulsarSink.builder()
                .setServiceUrl(serviceUrl)
                .setAdminUrl(adminUrl)
                .setTopics("topic1")
                // 序列化数据
                .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        stream.sinkTo(sink);

//        stream.addSink()

        env.execute();

    }



}
